package com.zhang.third.day09;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @title:
 * @author: zhang
 * @date: 2022/4/13 17:06
 */
public class Example8 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("create table item ( " +
                " userId string," +
                " productId string," +
                " categoryId string," +
                " type string," +
                " ts bigint," +
                " rt as TO_TIMESTAMP(FROM_UNIXTIME(ts))," +
                " WATERMARK FOR rt AS rt - INTERVAL '0' SECOND ) " +
                " with ( " +
                " 'connector' = 'filesystem', " +
                " 'path' = 'file:///Users/apple/IdeaProjects/flink_1.13/src/main/resources/UserBehavior.csv'," +
                " 'format' = 'csv' )");

        // todo 分组开窗
        Table itemCountWindow = tableEnv.sqlQuery(
                "select " +
                        " productId, " +
                        " count(productId) as cnt," +
                        " HOP_START(rt,INTERVAL '5' MINUTES,INTERVAL '1' HOURS) as windowStart," +
                        " HOP_END(rt,INTERVAL '5' MINUTES,INTERVAL '1' HOURS) as windowEnd" +
                        " from item " +
                        " where " +
                        " type = 'pv' " +
                        " group by productId,HOP(rt,INTERVAL '5' MINUTES,INTERVAL '1' HOURS)");

        //todo 排序
        Table rkTable = tableEnv.sqlQuery(
                "select " +
                        " productId," +
                        " cnt," +
                        " windowEnd," +
                        " row_number() over(partition by windowEnd order by cnt desc) as rk" +
                        " from " + itemCountWindow
        );

        // todo  求TopN
        Table result = tableEnv.sqlQuery(
                "select" +
                        " * " +
                        "from " +
                        "" + rkTable + " " +
                        "where rk <= 3");

        //todo 打印表信息
        tableEnv.toChangelogStream(result).print();

        env.execute();
    }
}
